-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Maintain replication connection between sync flows #1211
Conversation
c936149
to
772def2
Compare
c00b723
to
e14b538
Compare
Kevin pointed out a problem: in non parallel sync-normalize long normalize will have connection silent for too long & postgres can drop connection. Need to have keepalive logic. MaintainPull could work, but then we need to make sure to use synchronization between MaintainPull & StartFlow |
a395c50
to
7cdb75b
Compare
@@ -45,9 +45,10 @@ type SlotSnapshotSignal struct { | |||
type FlowableActivity struct { | |||
CatalogPool *pgxpool.Pool | |||
Alerter *alerting.Alerter | |||
CdcCacheRw sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe have a replication connection manager struct that takes care of:
- This map and locking.
- Keeping track of the connection health and lifecycle.
- Any other additional metadata pertaining to the replication connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking connection health & lifecycle are part of MaintainPull
which needs to exist either way to keep heartbeating session. Additional metadata belongs in the connector, where we avoid contention on CdcCacheRw
I could see moving replState/replConn out of the connector, then storing a struct { replState, replConn, connector }
as the value of the hashmap. But for now putting it all in connector works about the same
c30bd20
to
8124726
Compare
Required updating connector interfaces. Bit annoying `ctx` everywhere, but that's ultimately the correct way. Was running into context complications in #1211 with connector being shared between activities Putting context in struct essentially makes that struct a context, but this is not the context we necessarily want. For more context, see https://zenhorace.dev/blog/context-control-go Some changes were made: 1. GetCatalog takes a context now instead of using `context.Background()` 2. eventhubs processBatch now takes context instead of using `context.Background()` 3. many instances of `Query`/`Exec` in snowflake/clickhouse converted to `QueryContext`/`ExecContext` 4. got rid of cancel context in ssh tunnel, context being passed in is sufficient Followup to #1238
7558024
to
ff908e5
Compare
4ccc9dd
to
7583072
Compare
does using |
Recreation will have new RunID. OriginalRunID is because RunID changes with replays, OriginalRunID maintains determinism https://pkg.go.dev/go.temporal.io/[email protected]/internal#WorkflowInfo
|
e6857ea
to
5bdece2
Compare
92a9064
to
9010c35
Compare
Less side effects, less error handling, can correlate different workflows with same run id Also pull in some other cleanup from #1211
Less side effects, less error handling, can correlate different workflows with same run id Also pull in some other cleanup from #1211
Less side effects, less error handling, can correlate different workflows with same run id Also pull in some other cleanup from #1211
9010c35
to
265fa5c
Compare
Only pass config & options to StartFlow, removing StartFlowInput In fact, while we're at it, rename StartFlow to SyncFlow, the name doesn't really make sense anymore, & it'll make less sense after #1211
Only pass config & options to StartFlow, removing StartFlowInput In fact, while we're at it, rename StartFlow to SyncFlow, the name doesn't really make sense anymore, & it'll make less sense after #1211
Only pass config & options to StartFlow, removing StartFlowInput In fact, while we're at it, rename StartFlow to SyncFlow, the name doesn't really make sense anymore, & it'll make less sense after #1211
265fa5c
to
0d21eb8
Compare
Currently we reconnect with each sync flow, requiring repeatedly starting replication. This can take an exceedingly long time for some workloads on some databases
Fix: use temporal session to share state between activities, use a single source connector throughout cdc flow, & move replication connection back into source connection